之前,介绍了folly的异步框架,其调度中实际执行任务时由线程池完成。听说Facebook目前已经将内部的future
替换成协程coro
了,恰巧工作中有相关协程使用的讨论,于是看一下folly中基于C++20协程封装的框架。这里不介绍C++20协程的基本使用方式,想要了解可以看下面两个文档。
基础背景
协程相较于线程,其性能更优,由开发人员自己实现任务切换,而不用操作系统进行切换,避免了操作系统线程切换的开销。同时,协程提供了新的开发范式,协程可以看做一个天然的动态DAG调度框架,当某段处理逻辑计算依赖某个数据时,我们可以通过协程切换,先去获取到对应数据,等拿到对应数据后在切换回原逻辑继续执行。对于在执行过程中才能判断是否要执行的数据,对于静态图来说,其支持较为困难,可能需要在图中增加动态的disable逻辑,而使用协程,其天然支持动态决策,灵活性更高。
例如如下一个简单逻辑:
1 | def Function(): |
对于如上逻辑,其含义是,Function的执行依赖了三个函数FunctionA
,FunctionB
,FunctionC
。其中对于B,C函数来说,其执行依赖于A的结果,对于静态构图,可能的形式为:
1 | FunctionA |
在FunctionB
和FunctionC
中根据FunctionA
的值进行判断,来决定是否执行。
其实现方式较为繁琐,需要将一个节点拆分成为多个节点。
使用协程泽不需要如此繁琐,其实现如上面的伪代码基本一致,可能变成如下形式:
1 | def Function(): |
当我们需要某个数据时,使用协程切换,将执行逻辑切换到对应的数据获取方法上即可,当取回数据后,再回到原函数中继续进行处理。处理协程函数也是放到一个大的线程池中处理。这样,我们将要获取的数据全都直接丢到线程池中,由协程调度来自动寻找其依赖的函数,自动丢到线程池中,这样就只需要一个线程池,一个协程调度,就完美的实现了一个动态dag。
这里还存在一些问题,会在后续讲解中逐步回答:
- 如果一个节点被多个算子依赖,如何避免被重复计算。
- 一个节点依赖多个数据,如果每一次执行到要使用的位置在切换协程获取,那会导致每个字段获取串行执行,如何让依赖尽可能并发执行(这个其实和动态图有一定的冲突,但往往是一个强需求)。
Task使用
folly
实现的coro核心是Task类,folly官方文档上有十分详细的介绍,这里只贴出来一个使用样例:
1 |
|
执行结果:
1 | $ ./a.out |
可以看到调用sycn
时,其输出严格有序,调用asycn
时,其输出就是无序的了。这里说明了一个问题是,调用collectAllRange
时,如果task本身是同步方法,则其会被串行调用,如果其本身是异步方法,则调用就会异步执行。
同时可以看到调用异步方法asycn
时,在co_await
前后执行执行在相同的线程池,虽然我们设置了co_await
等待的task
在另外的线程池执行。这是因为Task
的promise_type
的await_transform
方法调用了co_viaIfAsync
,保证协程始终在指定线程池中执行。当await_suspend
返回void
或者false
时,会立即返回给协程函数的调用者。同时协程处于suspend
状态。按照如此逻辑,上面实例代码,在执行async
函数内部逻辑时(即async
协程第一次被resume
,即被co_await
时),在调用co_await
方法前,逻辑都执行在主线程中,当调用co_await
时,直接返回到主流程中开始执行下面的语句了,而async
协程被挂起,被co_awiat
的协程被分配到线程池中执行,在这些协程执行结束后,重新唤醒async
协程,由于co_viaIfAsync
方法封装了一层协程,保证被唤醒的async
协程依然在原线程池中执行。
这里想要说明的一点是,线程池执行协程函数时,如果被suspend
而未拉起其他协程的协程(当await_suspend
返回coroutine handle
时,会立即执行coroutine handle
对应的协程,原协程被挂起,至于执行完成新的协程后的处理逻辑,则由新的协程处理函数决定,可以选择恢复原协程,也可以选择再拉起一个协程,或者什么都不干。相当于使用新的协程上下文替换原协程的上下文,新协程执行逻辑和原协程无关,执行完成也不存在要返回到某个原协程的什么位置的概念),并不会占用线程池,因为被suspend的协程函数,会立即返回到调用处(不是返回到调用co_await
的地方,而是协程函数的入口位置),在task中,一般是回到resumeCoroutineWithNewAsyncStackRoot
函数中的h.resume();
,这样线程就会认为执行完成了该task,会继续从线程池的task任务池中取其他的task。而未完成的协程调用什么时候继续呢,会在线程池调用的某个方法中调用被暂停协程的resume
方法时被继续执行。这里也说明一个问题,应该尽可能的避免协程被suspend而不拉起新的协程,因为当出现这种情况时,线程池会需要从task队列中查找新的任务,这势必会造成额外的开销,相当于协程的链式调度切换失效了。不论怎么样,当线程池中执行的都是协程函数时,可以大大减少线程池数量,理论上来说,线程池数量和cpu核数绑定即可。
这里还有一个对协程锁的验证,可以看到,程序最终会卡住,这时因为执行getB()
函数时,我们获取了锁,在没有释放的前提下协程被切换到了执行getA()
函数,这里再次尝试获取锁,这就造成了死锁。这里说明了,协程锁不能解决由于协程切换造成的死锁问题,使用协程锁,更需要考虑死锁问题,要保证协程切换时锁要被正常释放。
对于这个例子,可以仅仅简单运行一下,当前不必深究,当完整了解coro实现后可以回过来来再看一下。
Task
包含了大量的基础类,这里我们进行逐一介绍。
TaskPromiseBase
TaskPromiseBase
是Task的promise_type
的基类,其决定了返回Task协程函数的实际执行逻辑。因此先介绍其具体实现。
TaskPromiseBase
类包含如下成员:
1 | class TaskPromiseBase { |
ExtendedCoroutineHandle
该类是coroutine_handle<void>
的拓展版本,其定义如下:
1 | class ExtendedCoroutineHandle { |
其实际存储的是调用co_await
的协程的coroutine_handle
。用于执行完成当前协程函数后,唤醒原来被切换出去的协程函数。举例来说:
1 | Task<T> func { |
在这个协程函数中,当执行到co_await时,当前协程函数func
会被暂停,执行调度到co_await对应的lambda
函数中去,但是当对于的lambda
执行完成后,如何回到原来的函数呢,这个工作是由Task自己完成的,不需要用户指定,其实现的核心就是这里的continuation_
(ExtendedCoroutineHandle类),lambda函数对应的Task中的promise_type
会持有func
协程的coroutine_handle
。当lambda执行完成后,调用coroutine_handle
的resume即唤醒func
协程。
其中ExtendedCoroutinePromise
是扩展的promise,这里其仅仅充当接口(因此是纯虚类),其定义如下:
1 | class ExtendedCoroutinePromise { |
AsyncStackFrame
异步栈帧,其定义如下:
1 | // An async stack frame contains information about a particular |
其中parentFrame
代表该异步操作的调用者的栈帧,通过parentFrame
将调用栈串连起来,调用链通过一个空指针终止,对于paremtFrame
为空指针去情况,要么表示该栈帧是被分离的状态(销毁),要么表示下一帧是阻塞等待异步堆栈完成的线程(栈顶?)。
instructionPointer
表示这个栈帧调用者的指令指针。这通常是此异步操作的延续地址,或启动此异步操作的代码的地址。 如果地址未知,则可能为空。该变量的赋值通常使用FOLLY_ASYNC_STACK_RETURN_ADDRESS()
方法。
其定义如下:
1 | #define FOLLY_ASYNC_STACK_RETURN_ADDRESS() __builtin_return_address(0) |
其中__builtin_return_address
可以看__builtin_return_address。
简单来说__builtin_return_address
是编译器内建函数,作用是用于获取当前函数或者调用函数的返回地址,当参数是0时,表示的是当前函数的返回地址,参数为1时表示的是调用该函数的函数返回地址。
stackRoot
是指向当前线程栈根的指针(stack root)。通过这里cache该变量,我们就不需要通过读取一个线程纬度的数据来获取该指针了。该指针只对最顶层的栈帧有效,当一个栈帧被入栈或者出栈时,该值需要被进行拷贝到对应的栈帧上。一个例外是最底层的栈帧,如果最底层的栈帧中该值不为空,则表示指向当前被阻塞在等待一个异步线程完成的根上(指向阻塞当前线程的异步线程的根上)。在这种情况下,您可以在AsyncStackRoot
中找到有关该线程的堆栈帧的信息,并可以使用它来继续遍历堆栈帧。
AsyncStackRoot
AsyncStackRoot
包含如下内容
1 | struct AsyncStackRoot{ |
topFrame
指向事件循环或者回调调用中当前正在执行的栈帧。
nextRoot
指向当前线程堆栈上下一个事件循环上下文的指针。
stackFramePtr
指向在当前线程上注册此 AsyncStackRoot
的函数调用的堆栈帧和返回地址的指针。这通常是负责执行异步回调(通常是事件循环)的堆栈框架。初始化该值的典型方法为FOLLY_ASYNC_STACK_FRAME_POINTER()
或者setStackFrameContext()
。
returnAddress
,通过FOLLY_ASYNC_STACK_RETURN_ADDRESS()
或者setStackFrameContext()
初始化。其中FOLLY_ASYNC_STACK_RETURN_ADDRESS
方法为:
1 |
这同样是编译器内建方法,作用是获取调研函数的返回地址。
线程栈与异步栈
AsyncStackRoot
和AsyncStackRoot
将普通线程栈和异步栈串连起来,其结构大致如下:
1 | // Current Thread Stack |
AsyncStackFrame
和AsyncStackRoot
用来串连协程调用的堆栈,使得其与线程调用栈类似。每个协程存在一个AsyncStackFrame
,协程之间通过AsyncStackFrame.parentFrame
串连起来。每个线程存在一个currentThreadAsyncStackRoot
,其存储一个AsyncStackRoot
。AsyncStackRoot
的topFrame
执行当前正在执行的协程栈帧。维护这些调用关系是方便进行debug。
对这些字段的维护设计如下函数:
pushAsyncStackFrameCallerCallee
该函数在一个协程调用另一个协程时执行,构建调用者和被调者的关系,并且维护stackRoot
指针(指向线程的AsyncStackRoot
)。
其实现如下:
1 | /* |
popAsyncStackFrameCallee
该函数用于调用完成了某个协程后,将协程栈从链表中删除,其逻辑如下:
1 | // calleeFrame表示被调的协程栈 |
ScopedAsyncStackRoot
ScopedAsyncStackRoot
不是一个函数,而是一个类,其用来维护线程的AsyncStackRoot
。其定义如下:
1 | class ScopedAsyncStackRoot { |
这里的framePointer
与returnAddress
都是之前提到的编译器函数对其赋值。其构造函数为:
1 | static thread_local AsyncStackRootHolder currentThreadAsyncStackRoot; |
首先初始化一个AsyncStackRoot
,之后挺好当前线程的AsyncStackRoot
为新建的root
。析构函数为:
1 | ScopedAsyncStackRoot::~ScopedAsyncStackRoot() { |
在析构函数中还原会原来线程的AsyncStackRoot
。
成员函数activateFrame
方法为:
1 | inline void activateAsyncStackFrame( |
设置当前root
的栈顶帧。
该类是在一个线程上新起一个协程方法时被调用,folly中目前主要是resumeCoroutineWithNewAsyncStackRoot
方法使用,其实现如下:
1 | FOLLY_NOINLINE void resumeCoroutineWithNewAsyncStackRoot( |
该函数的意思是使用一个新的AsyncStackRoot
来恢复执行一个协程。并且该协程与当前线程中的协程栈没有关系,因此需要维护一个新的AsyncStackRoot
,在该协程调用完成之后,再恢复原来的AsyncStackRoot
。
CancellationToken
CancellationToken
用于向函数或者操作进行信息传递,用于取消操作。其定义如下:
1 | // A CancellationToken is an object that can be passed into an function or |
其需要配合CancellationSource
使用。这里不展开介绍,核心是CancellationSource
负责管理cancel
逻辑,其存在requestCancellation
和getToken
两个核心接口。其中requestCancellation
用于设置取消逻辑(CancellationToken
不能设置取消,只能判断是否被取消),getToken
用于生成CancellationToken
。所以通过同一个CancellationSource
生成的CancellationToken
被统一管理,当CancellationSource
被设置cancel状态时,所以的CancellationToken
都被置为cancel状态。
其中存在merge
接口,其输入是多个CancellationToken
,并生成一个新的CancellationToken
。这里的逻辑是,聚合多个CancellationToken
,有应该被置为cancel状态时,新的这个cancel就会被置为cancel状态。其内部实现是新建了一个CancellationSource
,将其与参数中的CancellationToken
对应的CancellationSource
绑定,并设置回调函数。
介绍完了成员变量的类型,我们再来看对应使用该类作为promise_type
的协程来说,其执行逻辑。
分配Coroutine state
TaskPromiseBase
自定义了分配Coroutine state
的函数。
1 | static void* operator new(std::size_t size) { |
其new
使用的是__builtin_operator_new
。delete
使用的是__builtin_operator_delete
。这里后面的函数主要作用是进行尾调用优化。具体可以参数尾调用。
懒加载
TaskPromiseBase
被默认构造。之后会获取函数返回值,这里不是在TaskPromiseBase
中实现,而是在其派生类中实现,这里不做介绍。
当创建完成返回值后,会执行initial_suspend
判断,来决定是否可以立即执行协程。其实现为懒加载,即始终不会立即执行。:
1 | suspend_always initial_suspend() noexcept { return {}; } |
co_await
时获取awaitable
和awaiter
当协程内执行co_await
时,会调用await_transform
方法,这里实现了一系列的方法:
1 | template <typename Awaitable> |
其核心是第一个函数,即实际调用的是folly::coro::co_withAsyncStack
和co_viaIfAsync
以及co_withCancellation
方法。
对于Task
来说,这几个方法都重写了,这里看一下这几个函数的默认方法。
co_withCancellation
其方法核心是将cancel与协程任务绑定,对于Task相关结构来说,其绑定是没有问题的,但是默认情况下不清楚awaitable类型没办法绑定,因此默认的改函数实现是:
1 | FOLLY_DEFINE_CPO(detail::adl::WithCancellationFunction, co_withCancellation) |
即默认情况下直接返回awaitable
。
这里,task
,TaskWithExecutor
有实现该方法(后面会介绍该类),其实现如下:
1 | friend Task co_withCancellation( |
可以看到这里是直接将cancel与协程的promise绑定。
co_viaIfAsync
co_viaIfAsync
的作用是保证调用者协程能够始终在指定的executor
(线程池)中执行。
其具体实现是对协程再通过框架封装一层框架定义的协程,在框架定义的这一层来实现当前协程被suspend
后和在其恢复时依然在原线程池上执行。其实现较为复杂,可以先阅读后面部分,对coro
协程有整体了解后再回来看其具体实现。
使用一个简单例子来进行描述:
1 | Task<void> funca() { |
在上面的伪代码中,我们希望funcb
在线程池exectutor2
中执行,但是希望funca
在线程池executor1
中执行。在执行funcb
时,当其被挂起后,执行funca
,由于funca
被指定在线程池executor1
中执行,当funca
执行完成后,恢复funcb
的执行时,如果没有co_viaIfAsync
的协助,funcb
剩下的部分也将直接在executor1
中被执行,通过co_viaIfAsync
,可以保证funcb
均在指定线程池中执行。
下面我们来详细了解其实现逻辑。
1 | FOLLY_DEFINE_CPO(detail::adl::ViaIfAsyncFunction, co_viaIfAsync) |
如果用户实现了自己的co_viaIfAsync
方法则优先调用用户自己的方法。之后如果用户实现了awaitable
的viaIfAsync
方法,则会调用该方法,否则,调用ViaIfAsyncAwaitable
。下面主要看ViaIfAsyncAwaitable
的实现。
1 | class ViaIfAsyncAwaitable { |
当用户直接co_await folly::coro::co_viaIfAsync(executor_.get_alias(),awaitable)
时,实际执行的就变成了co_await ViaIfAsyncAwaitable
了(这里假设协程的promise_type
没有await_transform
方法,这个逻辑一般是直接在await_transform
中返回ViaIfAsyncAwaitable
)。之后通过ViaIfAsyncAwaitable::co_await
方法获取awaiter
,该方法只创建一个ViaIfAsyncAwaiter
。ViaIfAsyncAwaiter
即为这里实际的awaiter
。
下面看一下ViaIfAsyncAwaiter
的实现:
1 | template <bool IsCallerAsyncStackAware, typename Awaitable> |
在创建ViaIfAsyncAwaiter
时会调用CoroutineType::create(std::move(executor))
方法和folly::coro::get_awaiter(static_cast<Awaitable&&>(awaitable))
方法。其中第一个方法的实现为:
1 | static ViaCoroutine createImpl() { co_return; } |
可以看到在执行第一个方法的时候,调用的是一个空的协程,这里就完成了对原来协程的一层封装,相当于在原协程上又封装了一层协程。该协程对应的ViaCoroutine
结构为:
1 | class ViaCoroutinePromiseBase { |
当创建ViaIfAsyncAwaiter
时,首先会创建ViaCoroutine::promise_type
。之后调用ViaCoroutine::promise_type::get_return_object
方法创建ViaCoroutine
(这时拿到当前协程的coroutine_handle
)。之后挂起。
可以看到创建ViaIfAsyncAwaiter
会起一个新的协程,并且suspend
在对viaCoroutine_
的赋值上。
创建ViaIfAsyncAwaiter
的第二个函数执行逻辑为:
1 | template < |
这里是根据Awaitable
获取到awaiter
,其实现与协程实现一致,根据是否存在co_await
函数来决定执行逻辑。
到此,完成了ViaIfAsyncAwaiter
的创建于获取。只会执行co_await
对awaiter
的操作。
首先执行await_ready
函数,其实现如下:
1 | decltype(auto) await_ready() noexcept(noexcept(awaiter_.await_ready())) { |
直接根据co_awaiter coro
的那个协程(被调协程)来决定是否ready,如果已经ready了,直接执行。
正常情况下ready都是false,此时会调用await_suspend
来触发实际执行。其逻辑为:
1 | template <typename Promise> |
这里IsCallerAsyncStackAware
是false,可以不考虑该逻辑。参数的continuation
为调用者协程的coroutine_handle
,即我们需要保证执行位置的协程。将continuation
存储到viaCoroutine_
,之后执行return awaiter_.await_suspend(viaCoroutine_.getHandle());
。将架构封装的这层协程的coroutine_handle
作为参数执行被调协程。这时,正常来说会立即执行被调协程,并且在被调协程执行完成之后,会唤醒调用者协程,这里的调用者协程就是架构封装的这一层协程。
这时才会执行完成创建ViaIfAsyncAwaiter
时的createImpl
函数。在执行完成该函数后析构该协程前,将会执行co_await ViaIfAsyncAwaiter::promise_type::final_suspend
,这里将返回ViaIfAsyncAwaiter::promise_type::FinalAwaiter
,其定义如下:
1 | struct FinalAwaiter { |
这里await_suspend
的参数是架构这层协程的coroutine_handle
。
其首先获取promise
,设置RequestContext
,之后执行scheduleContinuation
,其实现如下:
1 | void scheduleContinuation() noexcept { |
可以看到,其实现就是把continuation_
的resume
添加到指定线程池中执行,这里的continuation_
即为调用者协程的coroutine_handle
,即我们需要保证执行位置的协程。
至此完成了保证协程在指定线程池上执行的全部逻辑,可以看到,整体实现相当精妙,里面使用了C++的很多特性,值得深入研究。
co_withAsyncStack
co_withAsyncStack
与co_viaIfAsync
的作用类似,其被用于await_transform()
内部,用于在当前协程suspend
时将当前协程的调用栈信息暂存起来,在resume
时恢复,其实现也于co_viaIfAsync
类似,架构封装了一层协程实现,这里不过多介绍,详细信息可以看相关代码。
这里特别注意的是,对于希望自己维护调用栈关系的Awaitables
,可以定义tag_invoke
函数来自己控制,类似如下代码:
1 | class MyAwaitable { |
对awaiter
的处理
获取到awaiter
后,会调用其await_ready
,await_suspend
以及最后的await_resume
作为co_await
的返回结果。这里都不在TaskPromiseBase
的控制范畴,会在后续部分详细介绍。
协程结束
协程结束时,如果协程执行过程中跑出了异常,则会先执行unhandled_exception
,这里其定义不在TaskPromiseBase
而在TaskPromise
,其定义为:
1 | void unhandled_exception() noexcept { |
即设置对应result_
为异常。
如果没有异常(有异常也执行),则直接执行co_await TaskPromiseBase::final_suspend()
。这里我们看一下其定义:
1 | class FinalAwaiter { |
await_ready()
返回false表示为懒加载。因此co_await
会执行await_suspend
,这里coro
是当前协程的coroutine_handle
。
其逻辑是首先获取当前协程的promise,这里就是TaskPromise
。调用popAsyncStackFrameCallee
将当前协程从协程栈中出栈。如果存在异常,则调用continuation_的异常处理函数,并返回coroutine_handle
。
当没有异常时,返回continuation_
的coroutine_handle
。这里的continuation_
是调用者协程的coroutine_handle
,其会在后续介绍的awaiter
的await_suspend
中赋值。通过这个逻辑,实现了被调者完成处理后唤醒调用者。
协程结束时需要析构handler
,folly的实现是析构交由awaiter
来实现。
TaskPromise
Task
的promise_type
并不是直接使用TaskPromiseBase
而是使用的TaskPromise
,其在TaskPromiseBase
基础上增加一些协程的相关函数。
其定义如下:
1 | template <typename T> |
这里,其主要增加了一个Try<StorageType> result_
用来存储协程返回值。实现了异常处理函数unhandled_exception
。获取协程返回值get_return_object
1 | template <typename T> |
用户调用co_return
是设置协程返回值return_value
。这里设置的返回值会是co_await coro
的最终返回值,即awaiter
的await_resume
最后会返回该值。
TaskPromise
还有一个TaskPromise<void>
的偏例化,其主要实现了void相关的接口,这里不详细介绍。
Task
task
是folly coro的核心类,一般对于协程函数返回值都应该是Task。通过task,folly将协程调用链串连起来。其定义如下:
1 | template <typename T> |
根据之前关于promise_type
的介绍,这里当co_await Task
时,获取到的awaiter
是Task::Awaiter
。
这里其await_ready
始终返回false。一定会执行await_suspend
,这里其参数continuation
是调用者协程的coroutine_handle
,而不是co_await Task
里面的这个Task
。
在await_suspend
,首先设置promise
的continuation_
为调用者协程的coroutine_handle
,配合上面介绍的TaskPromiseBase::FinalAwaiter
实现被调协程完成后唤醒调用者协程。这里的if constexpr (detail::promiseHasAsyncFrame_v<Promise>)
为true,这里的操作是维护协程的调用栈,将当前协程的调用栈追加到调用链路中。之后返回当前协程的coroutine_handle
,这会立即执行当前协程。
当协程执行结束,会调用await_resume
获取co_await Task
的最终返回值,即co_return expr
设置的值。这里实际执行的是Try::value
。如果没有设置value,则会抛出异常,但是如果是void
值,则不会抛出异常,这是由于Try<void>
默认是有值的。因此对于返回Task<void>
的协程,可以不执行co_return
,对于其他类型的返回一定要执行co_return expr
。
对于返回Task<void>
的协程来说,一定要特别注意是,虽然可以不执行co_return
,但是一定要保证函数是协程,即至少要出现co_await
,co_return
或者co_yield
。由于Task
没有将默认构造函数delete,因此如果没有出现这三个关键字,则该函数就不是协程函数,不会按照协程的方式执行(不要问我是怎么知道的…)。例如:
1 | Task<void> a() { |
这就不是一个协程函数。
TaskWithExecutor
Task
协程默认运行在调用者线程中,但在对延迟较敏感的服务中,我们需要将不同协程执行在不同线程中,也就是一般说的M:N
模式(brpc
中bthread
也是这种模式,将m个用户态线程映射到n个实际liunx线程中,m远大于n)。为提供该功能,Task
提供接口scheduleOn
及TaskWithExecutor
类。
1 | TaskWithExecutor<T> scheduleOn(Executor::KeepAlive<> executor) && noexcept { |
接口提供一个线程池,设置改task执行在该线程池中。返回TaskWithExecutor
,其定义如下:
1 | template <typename T> |
先来看当co_await TaskWithExecutor
时返回的awaiter
执行逻辑,这里根据之前TaskPromise
描述可以推断出这里返回的Awaiter
,其执行核心为Awaiter::await_suspend
函数,其和Task对应的awaiter核心区别是执行协程的逻辑是在被丢到指定线程池中执行,同时需要维护一下ctx
,对于ctx的作用可以参考上一篇介绍future的文档:[RequestContext][https://www.yinkuiwang.cn/2023/01/08/folly%E5%BC%82%E6%AD%A5%E6%A1%86%E6%9E%B6%E4%B8%8EDAG/#RequestContext]。
这里由于被调协程会被丢到线程池中执行,因此调用者协程如果直接在被调协程后被resume
则会破坏调用者协程指定执行位置(线程池),因此需要co_viaIfAsync
函数(上面有介绍)。
函数Awaiter::await_suspend
还有一点需要注意的是,这里返回的是空,表示会挂起调用co_await
方法的协程,并返回到调用该协程的地方。加入到线程池的函数逻辑较为简单,只有如下两句:
1 | RequestContextScopeGuard contextScope{std::move(ctx)}; |
这里需要注意的是,调用folly::resumeCoroutineWithNewAsyncStackRoot(coro);
时会恢复当前task绑定的协程,如果恢复协程后,协程内部执行co_await
返空了,则调用回到folly::resumeCoroutineWithNewAsyncStackRoot(coro)
函数中的h.resume()
语句,这时体现到线程上,这个函数就执行完成了,不会出现阻塞线程的情况。
这里的还有一个InlineTryAwaitable
,似乎只有显示调用startInlineUnsafe
时会使用,其await_suspend
也是不加到线程池里直接调用,这一般是指调用者和被调者使用的是同一个线程池。
TaskWithExecutor
的另一个核心接口是start
,其含义是执行当前协程,并返回一个SemiFuture
,用户使用SemiFuture
来等待调用结束。
其核心是将协程执行状态和一个promise
绑定,当协程执行完成后,对promise的SemiFuture
赋值。这里的核心点是如何触发协程的执行,其实现方式是再加一层协程,这里就是
1 | template <typename F> |
当调用start时,最终会执行到该方法,其返回InlineTaskDetached
定义如下:
1 | struct InlineTaskDetached { |
其实现较为简单,await_transform
方法只是对原awaitable
增加了一层co_withAsyncStack
。最终的协程结束处理(FinalAwaiter
)也没干什么,只是维护了一下调用栈并且析构了一下资源。调用该函数返回InlineTaskDetached
后会立即调用其start方法。该方法直接将自己持有的协程resume
,这时就会执行cb(co_await folly::coro::co_awaitTry(std::move(task)));
,从而触发我们实际要等到的协程的执行。而返回给调用者的SemiFuture
则给用户做判断是否执行完成,当协程执行完成后,cb
函数会完成对promise
的setTry
,这时调用者获得的SemiFuture
就变成完成状态。
等待协程执行结束
folly官方文档介绍等待协程执行结束有两种方式:
- 协程调用
scheduleOn().start()
folly::coro::blockingWait(std::move(task).scheduleOn())
第一种方式在TaskWithExecutor
中已经介绍过了,这里再来看一下blockingWait
的实现。
1 | inline constexpr blocking_wait_fn blocking_wait{}; |
这里执行的主要是第一个operator
方法。其中makeRefBlockingWaitTask
定义如下:
1 | template < |
这里BlockingWaitTask
是一个协程返回值。其定义如下:
1 | template <typename T> |
当调用get方法时,会将自己持有的协程resume
。这里实际执行的就是co_await static_cast<Awaitable&&>(awaitable);
,也即我们要等到执行结束的协程。之后执行promise.wait()
等待协程执行结束。这里需要看一下promise
的实现。
1 | class BlockingWaitPromiseBase { |
其中核心在BlockingWaitPromiseBase
中,其存在一个folly::fibers::Baton
类。该类的实现使用的是futex
,可以参考该文档futex。这里不展开介绍(其实是不太会orz)。核心是一个同步原语。folly对其封装了一下,核心是两个接口,一个是post
,另一个是wait
。其中wait
用于等待同步信号,post
用于发送信号,在post发送信号前,调用wait
的线程会被阻塞,直到另一个线程发送了post
信号(有点像条件变量的感觉)。因此这里在将我们等待的协程resum后,就通过wait
接口等待协程完成。
这里有一点需要注意,正常来说BlockingWaitPromiseBase
持有的协程被恢复后,如果执行完成了,我们等待的协程应该执行结束了才对,为啥还需要使用baton
来进行同步呢?这时因为,将BlockingWaitPromiseBase
持有的协程resume,该协程不一定(在这里是大概率)表示该协程被执行完成了。当我们resume BlockingWaitPromiseBase
持有的协程时,执行co_await static_cast<Awaitable&&>(awaitable);
,当我们co_await
的协程被指定到特定线程池执行时,执行co_await
时调用的await_suspend
方法返回就是空(可以看TaskWithExecutor
的Awaiter::await_suspend
方法),这时会立即回到resume
协程的地方,这里就是回到了即回到folly::resumeCoroutineWithNewAsyncStackRoot(coro_)
里面的h.resume()
这条语句这里,接着执行后面的逻辑。而BlockingWaitPromiseBase
持有的协程被挂起,当我们等待的协程执行结束时,会重新唤醒BlockingWaitPromiseBase
持有的协程,进行后续处理。因此在这里我们使用promise.wait();
等待BlockingWaitPromiseBase
被重新唤醒并被执行完成(在执行完成时FinalAwaiter
的await_suspend
发送post
信号告知执行结束),这样才能保证我们等待的协程确定被执行完成。
clollectAll
想DAG中依赖关系一样,一个协程依赖的数据产出可能需要多个协程生成,这时,如果我们按照协程实际依赖的数据,每次都co_await
对应的协程,将会导致依赖的协程顺序被触发,串行执行,这在对耗时较为敏感的系统中是不可接受的,我们需要有统一触发多个协程并发执行的接口,这就是这一节要介绍的collectAll
接口。其传递的参数是一个Task
的list,如果task是异步的,即指定线程池执行,则所有的task会被异步执行,如果task是同步的(没有转换为TaskWithExecutor
,则会被同步执行)。
其实现如下:
1 | template < |
可以看到,collectAllRange
本身也是一个协程函数,返回值是一个Task
。因此我们调用该方法时,拿到task后,还需要co_await task
。
具体的每步执行逻辑上面都进行了注解。这里核心需要关注的是detail::Barrier
和detail::BarrierTask
。
Barrier
Barrier
是一个屏障,当所以协程执行完成后,才就绪,其定义如下。
1 | class Barrier { |
其存在三个成员变量。
count_
计数,用来记录当前还未执行完成的协程数量。continuation_
表示当所有条件就绪后需要执行唤醒的协程。asyncFrame_
用于维护协程栈。
这里的Awaiter
就是collectAll中最后我们co_await
的awaiter
。当collectAll
co_await
时,首先设置了最终要唤醒collectAll
函数,并将引用计数减一。当每个协程执行结束时,也会将引用计数减一。
BarrierTask
BarrierTask
是collectAll
中对每一个task
包的一层协程。其定义如下:
1 | class BarrierTask { |
promise_type
持有一个barrier
的指针,当我们调用BarrierTask
的start函数时,传递barrier
,将在collectAll
中创建的barrier
传递到promise_type
。调用start
函数后,就会恢复当前协程的执行。在执行完成当前协程后,final_suspend
返回FinalAwaiter
。当co_await
该awaiter
时,执行await_suspend
函数,执行barrier
的arrive
函数,将barrier
计数减一,并根据是否已经减到0了来决定是否唤醒等待的协程。
这里在collectAll
中设置barrier
为task数量+1中的+1,是collectAll调用co_await detail::UnsafeResumeInlineSemiAwaitable{barrier.arriveAndWait()};
使用。调用这个函数,会设置barrier满足条件后唤醒的collectAll
,并将当前协程挂起,等所以协程都执行结束后再唤醒该协程。
这里需要注意的是,BarrierTask
执行start函数唤醒当前协程时,会执行在collectAll
中makeTask
函数,即执行
1 | co_await co_viaIfAsync( |
如果这里的semiAwaitable
不是异步协程,则会陷入semiAwaitable
协程执行对应协程的处理逻辑,执行完成后再返回到makeTask
函数。而如果semiAwaitable
是异步协程,则执行上述语句后会立即返回到start
函数中,继续执行for循环后的其他start方法。
因此,当我们collectAll
时,为了让所有协程并发执行,一定要传异步协程,否则所有协程一样会被串行执行,和一个一个co_await
没区别(甚至因为套了一层协程更废了)。
promise&future&SharedPromise
在上一节,我们解决了一个协程依赖多个协程产生的数据。但是在实际执行中,往往还有另一个问题,如果应该协程产生的数据被多个协程依赖如何处理。根据前面的学习,我们知道,肯定不能是多个协程同时co_await
同一个task(协程正常应该只需要执行一次,并且task上线中,co_await时生成的awaiter都使用了std::exchange(t.coro_, {})
方式,不支持多次co_await)。
目前对于该需求的实现,和之前介绍的future方法实现一致,即使用SharedPromise
。具体可以参考上一篇文档:future&SharedPromise。
将协程的task绑定到一个SharedPromise
上,当某个协程依赖该协程使用数据时,从SharedPromise
中获取一个future
,co_await
该future即可。当协程执行完成后,设置SharedPromise
状态为完成状态,这时所有等待的协程都会被唤醒。
这里的关键是对co_await future
的实现,要保证在co_await future
时协程挂起不阻塞。其实现也较为简单:
1 | template <typename T> |
当co_await future
是,返回的awaiter
是detail::FutureAwaiter
,其实现如下:
1 | template <typename T> |
其实现也较为简单,当co_await future
时,如果future
已经ready,则立即继续执行就好了,因此await_ready
中先判断就绪状态。如果future未就绪,则执行await_suspend
时,在future后增加一个callback
,其逻辑是在future就绪后唤醒当前协程,同时返回空,挂起当前协程。
例子:
1 |
|
这里借助SharedPromise
,我们将协程函数设计成了可重入函数。其中使用的folly::coro::SharedMutexFair
是协程的读写锁,后面会进行介绍。其他相关逻辑都有注释介绍,这里就不一一解释了。
核心点
使用协程,并且每个协程都分配一个线程池执行的情况下,执行层面优化点在哪,其实每次的调度都是需要把协程丢到线程池队列去执行,那每一个协程的实际执行需要通过线程池的分发。虽然需要通过线程池来进行分发协程任务,但是线程池在执行协程时基本不会阻塞,这就大大减少了内核调度的开销,在线程池分发协程任务时,可能会由于使用有锁队列造成一定的线程阻塞,但大部分情况来说(协程任务不是特别碎的情况下)这部分开销相对较小,因此使用协程还是会有较大收益。
这时我们再考虑一下folly的future,其实如果我们能够完全按照规范使用future,在任务内部不要死等,而都交于future的调度,其实也不会造成线程阻塞,效率理论上来说和协程应该不会差距很多。因此只要解决当前future实现的异步方法中死等的问题即可,利用coro就很好实现了。所以folly的coro兼容future,只需要在原来任务内部future.get()
的地方改成co_await future
,并且函数返回值是task
即可,这样原来的future和现在的core其实新能应该差距不大。
避免阻塞
协程核心就是避免阻塞造成操作系统对线程的切换开销。因此如何避免阻塞就是协程库需要考虑的核心问题了,folly
对原阻塞方法都提供了相应的非阻塞方法,下面我们针对性的进行介绍。
sleep
sleep
方法时明显的阻塞调用。folly
利用事件驱动框架封装了一个非阻塞版本的sleep
。其实现如下:
1 | Task<void> sleep(HighResDuration d, Timekeeper* tk = nullptr); |
folly::coro::Baton
在介绍等待协程结束章节已经介绍过了,这里不做赘述。这里的核心是folly::futures::sleep(d, tk).toUnsafeFuture();
该方法返回一个future
。其实直接将其返回,我们co_await future
就可以了,但这里为了返回task
,直接将co_await future
的实现封装到了函数内部。
folly::futures::sleep(d, tk)
方法将超时时间加到全局的事件驱动框架中(EventBase
类),该事件驱动框架基于libevent
实现,这个没看过,不过可以参考nginx
的事件驱动框架nginx时间驱动框架。这里就不展开介绍了,后续如果有时间,可以研究一下。
IO
IO是阻塞的重灾区,对于同步io,是没有办法解决阻塞问题的,只能改成使用异步IO。Facebook开源的thrift rpc支持异步io。可以将异步io返回值设置成future
,当io完成后,设置对应的promise即可。这时在业务代码中,我们只需要co_await future
即可。
对future
支持co_await
上面已经介绍过了,这里就不再赘述了。
锁
使用一般的线程锁,当出现锁冲突时,未获取到锁的线程将会被挂起。为了避免由于锁导致的阻塞问题,folly提供了协程锁(协程锁的设计不是为了避免协程切换导致的死锁,而是为了避免协程阻塞)。这里解释上文使用过的协程锁SharedMutexFair
。
SharedMutexFair
实现时基于自旋锁和原子变量实现的,实际以原子变量状态控制锁信息,自旋锁只在读写原子变量时使用。每次读写完原子变量立即释放锁,避免阻塞。
这里只介绍核心接口及其实现。
1 | class SharedMutexFair : private folly::NonCopyableNonMovable { |
这里有一个核心结构folly::Synchronized<State, folly::SpinLock>
。这里不展开启实现细节,我们只需要知道其是同步原语即可,其持有一个state
数据,访问其中数据都应该通过auto lock = state_.contextualLock()
的lock
访问,state
里面的数据都可以通过lock
利用->
操作符直接访问到。调用auto lock = state_.contextualLock()
时,不仅获取到了对应存储的数据,同时获取了对应的folly::SpinLock
锁,即获取的数据被folly::SpinLock
锁保护。这里使用的是folly实现的自旋锁,这里也不展开介绍了,其可以理解为就是linux提供的自旋锁。自旋锁理论上是更废cpu的,但是这里为什么要是有自旋锁呢。这就要考虑自旋锁使用的场景了,自旋锁一般用于预期很快就能获取到锁的场景,这样可以避免像互斥锁一样需要将线程先挂起,再恢复的操作。这正是协程调度时所需要的,由于线程池较少,一般不会有很多锁竞争,即使有锁竞争也应该很快会获取到锁,并且要避免执行协程的线程阻塞,因此这里选取的是自旋锁。
对于可自动释放的锁来说,其实现就比不自动释放的增加了在await_resume
中返回自动释放的class,其他没啥区别。下面我们来依次介绍锁的获取与释放。
读锁
1 | inline SharedMutexFair::LockOperation<SharedMutexFair::LockSharedAwaiter> |
当co_await co_lock_shared()
时,获取的awaiter
是LockSharedAwaiter
,其定义如下:
1 | class LockSharedAwaiter : public LockAwaiterBase { |
首先尝试获取读锁,如果获取成功,则继续执行协程。如果失败,则执行await_suspend
。在await_suspend
中,再次执行一次可以直接上锁的判断,如果可以上锁,则不suspend协程。否则,记录当前协程的continuation_
,将当前协程加入到等待列表中。其中如下两句语句比较绕:
1 | *lock->waitersTailNext_ = this; |
第一句是把队尾的指针赋值为当前awaiter
,关键是第二句,这里waitersTailNext_
是一个双重指针,即LockAwaiterBase**
这里将waitersTailNext_
指向了当前awaiter的nextAwaiter_
结构,则下次再向列表中添加元素时,执行的还是这两个语句,这时,第一条语句*lock->waitersTailNext_ = this;
,就是将这一次的nextAwaiter_
赋值为指向添加的awaiter
。这样,每次对*lock->waitersTailNext_
赋值,都是在对链表最后一个awaiter
的nextAwaiter_
赋值,以此达到串连所有awaiter
的目的(妙啊)。这里还有个问题是起始指针,即State
的waitersHead_
变量,这就要再来看一下State
的初始化了:
1 | State() noexcept |
可以看到,waitersTailNext_
初始化执行waitersHead_
,则第一次执行*lock->waitersTailNext_
就是对waitersHead_
赋值(好家伙,指针是被他玩明白了)。
至此,完成了等待协程awaiter
的串连。
将等待读锁添加到等待链表后,当写锁释放时会遍历链表,对等待的协程加读锁。
释放读锁
释放读锁逻辑如下
1 | void SharedMutexFair::unlock_shared() noexcept { |
这里的核心逻辑是unlockOrGetNextWaitersToResume
函数,其作用是获取可以获得锁的列表,其逻辑如下:
1 | SharedMutexFair::LockAwaiterBase* |
可以看到,其逻辑是按照等等的头部属性来拉取满足条件的awaiter
,同时加锁。
resumeWaiters
逻辑较为简单:
1 | void SharedMutexFair::resumeWaiters(LockAwaiterBase* awaiters) noexcept { |
遍历获取的awaiters
,resume
即可。但这里有个问题,如果resume
协程后,协程串行执行,将会导致效率低下,即使协程本身绑定了executor
,也不能保证被挂起后执行依然是异步的,这时就需要使用co_viaIfAsync
方法,即在调用co_await
时,对awaiter
增加一层co_viaIfAsync
封装,这就保证协程始终时异步协程(如果executor
不为空),并且是被执行在指定的线程池上。这也是为什么返回的awaiter
都由LockOperation
包一层,因为其定义了viaIfAsync
方法。对于task
来说,这些是不必要的,但是如果是自己定义的协程promise_type
就需要注意,执行锁获取应该使用
1 | const folly::Executor::KeepAlive<> executor = co_await co_current_executor; |
避免被唤醒的协程被串行执行。
自动释放的读锁
自动释放的读锁不需要用户显示调用unlock_shared()
,在返回值的生命周期结束会自动释放,接口是:
1 | [[nodiscard]] LockOperation<ScopedLockSharedAwaiter> |
其中实现如下:
1 | class ScopedLockSharedAwaiter : public LockSharedAwaiter { |
可以看到,其相对LockSharedAwaiter
唯一区别是其增加了返回值,该返回值将mutex_
包起来,在析构时,调用释放锁的函数:
1 | ~SharedLock() { |
其他与正常读锁没什么区别。
写锁
介绍完了读锁,写锁就简单很多了。获取锁接口有两个,一个会自动释放,一个不会,这里只简单介绍不自动释放的。
获取写锁:
1 | inline SharedMutexFair::LockOperation<SharedMutexFair::LockAwaiter> |
释放写锁:
1 | void SharedMutexFair::unlock() noexcept { |
这里直接获取可以添加的队列而没有标记lockedFlagAndReaderCount_
为kUnlocked
是因为unlockOrGetNextWaitersToResume
实现时是直接对unlockOrGetNextWaitersToResume
赋值的,而不是再远基础上加减,因此没有必要执行这一步。